Skip to main content

When to Use Multiple Agents

Use multiple agents when:
  • Different specialized skills needed (research + analysis + writing)
  • Parallel workload distribution (process 10 tasks simultaneously)
  • Clear role separation (supervisor + workers)
Don’t use multiple agents when:
  • Single workflow can handle it
  • Communication overhead exceeds benefit
  • Testing/debugging becomes too complex

Orchestration Approaches

Three main categories: 1. Graph-Based Orchestration
  • Define workflows as state machines
  • Examples: LangGraph
2. DAG-based Orchestartion
  • Define workflows as DAGs
  • Examples: OpenAI Agent Builder, Digibee
3. Code logic Orchestration
  • Mix agent execution with code logic
All implement the same patterns. Choose based on your team’s needs.

The Four Workflow Patterns

1. Sequential Workflows

When to use: Each step depends on previous step’s output. LangGraph Implementation:
from langgraph.graph import StateGraph, END
from typing import TypedDict

class DocumentState(TypedDict):
    """State passed between nodes."""
    raw_text: str
    entities: dict
    summary: str
    keywords: list

def extract_entities(state: DocumentState) -> DocumentState:
    """Step 1: Extract named entities."""
    response = client.messages.create(
        model="claude-sonnet-4-20250514",
        messages=[{
            "role": "user",
            "content": f"Extract people, organizations, locations as JSON: {state['raw_text']}"
        }]
    )
    
    state["entities"] = json.loads(response.content[0].text)
    return state

def generate_summary(state: DocumentState) -> DocumentState:
    """Step 2: Summarize with entity context."""
    context = f"Known entities: {state['entities']}"
    
    response = client.messages.create(
        model="claude-sonnet-4-20250514",
        messages=[{
            "role": "user",
            "content": f"{context}\n\nSummarize: {state['raw_text']}"
        }]
    )
    
    state["summary"] = response.content[0].text
    return state

def extract_keywords(state: DocumentState) -> DocumentState:
    """Step 3: Extract keywords from summary."""
    response = client.messages.create(
        model="gpt-4o-mini",  # Cheaper for simple task
        messages=[{
            "role": "user",
            "content": f"Extract 5 keywords: {state['summary']}"
        }]
    )
    
    state["keywords"] = response.content[0].text.split(", ")
    return state

# Build workflow
workflow = StateGraph(DocumentState)
workflow.add_node("extract_entities", extract_entities)
workflow.add_node("summarize", generate_summary)
workflow.add_node("extract_keywords", extract_keywords)

# Define sequence
workflow.set_entry_point("extract_entities")
workflow.add_edge("extract_entities", "summarize")
workflow.add_edge("summarize", "extract_keywords")
workflow.add_edge("extract_keywords", END)

app = workflow.compile()

# Execute
result = app.invoke({
    "raw_text": "Document text...",
    "entities": {},
    "summary": "",
    "keywords": []
})
code logic with OpenAI Agent SDK (Same Pattern):
from agents import Agent, Runner
import asyncio
import json

# Agent 1: extract entities as JSON
entities_agent = Agent(
    name="Entity Extractor",
    model="gpt-4o-mini",
    instructions=(
        "You extract named entities from user-provided text. "
        "Return ONLY a valid JSON object with keys: "
        '{"people": string[], "organizations": string[], "locations": string[]}. '
        "No prose, no backticks."
    ),
)

# Agent 2: summarize using entities and return summary + keywords as JSON
summarizer_agent = Agent(
    name="Summarizer",
    model="gpt-4o-mini",
    instructions=(
        "You write a concise summary (≤100 words) using the provided entities as context, "
        "and extract 5 keywords. Return ONLY a valid JSON object with keys: "
        '{"summary": string, "keywords": string[]}. No prose, no backticks.'
    ),
)

async def run_sequential(raw_text: str) -> dict:
    # 1) Run Entity Extractor
    entities_run = await Runner.run(entities_agent, raw_text)
    # Convert agent output to text and parse JSON
    entities_messages = entities_run.to_input_list()
    entities_json_str = entities_messages[-1]["content"] if isinstance(entities_messages[-1], dict) else str(entities_messages[-1])
    entities = json.loads(entities_json_str)

    # 2) Run Summarizer using entities
    summarizer_input = (
        f"Raw text:\n{raw_text}\n\n"
        f"Entities (JSON):\n{json.dumps(entities, ensure_ascii=False)}"
    )
    summary_run = await Runner.run(summarizer_agent, summarizer_input)
    summary_messages = summary_run.to_input_list()
    result_json_str = summary_messages[-1]["content"] if isinstance(summary_messages[-1], dict) else str(summary_messages[-1])
    result = json.loads(result_json_str)

    # Combine into a single structured result (mirrors the sequential pattern)
    return {
        "entities": entities,
        "summary": result.get("summary", ""),
        "keywords": result.get("keywords", []),
    }

# Example usage
if __name__ == "__main__":
    example_text = "Sundar Pichai from Google met with leaders in Paris and New York."
    final_result = asyncio.run(run_sequential(example_text))
    print(json.dumps(final_result, indent=2, ensure_ascii=False))

2. Parallel Execution

When to use: Independent tasks that can run simultaneously. Pattern:
async def parallel_analysis(documents: list[str]):
    """Analyze multiple documents in parallel."""
    
    async def analyze_doc(doc: str) -> dict:
        """Single document analysis."""
        response = await client.messages.create(
            model="claude-haiku-3-5-20250305",
            messages=[{
                "role": "user",
                "content": f"Analyze sentiment and extract topics: {doc}"
            }]
        )
        return {"document": doc, "analysis": response.content[0].text}
    
    # Execute all in parallel
    tasks = [analyze_doc(doc) for doc in documents]
    results = await asyncio.gather(*tasks)
    
    return results

# 10 documents: Sequential = 30s, Parallel = 3s
Production Consideration: Batch parallel requests to avoid overwhelming APIs:
async def parallel_with_batching(items: list, batch_size: int = 10):
    """Process in parallel batches."""
    
    results = []
    
    for i in range(0, len(items), batch_size):
        batch = items[i:i+batch_size]
        batch_results = await asyncio.gather(*[process(item) for item in batch])
        results.extend(batch_results)
    
    return results

# 100 items, batches of 10 = 10 parallel batches

3. Conditional Branching

When to use: Different paths based on data or business logic. Customer Support Routing Example:
class SupportTicketState(TypedDict):
    """Ticket workflow state."""
    message: str
    category: Literal["technical", "billing", "sales"]
    priority: Literal["low", "medium", "high"]
    resolution: str

def classify_ticket(state: SupportTicketState) -> SupportTicketState:
    """Classify incoming ticket."""
    response = client.messages.create(
        model="gpt-4o-mini",
        messages=[{
            "role": "user",
            "content": f"""Classify this support ticket:

Message: {state['message']}

Return JSON: {{"category": "...", "priority": "..."}}"""
        }]
    )
    
    result = json.loads(response.content[0].text)
    state["category"] = result["category"]
    state["priority"] = result["priority"]
    return state

def handle_technical(state: SupportTicketState) -> SupportTicketState:
    """Technical support handler."""
    # Search knowledge base
    articles = search_kb(state["message"])
    
    response = client.messages.create(
        model="claude-sonnet-4-20250514",
        messages=[{
            "role": "user",
            "content": f"""Provide technical support.

Issue: {state['message']}
KB Articles: {articles}

Give step-by-step solution."""
        }]
    )
    
    state["resolution"] = response.content[0].text
    return state

def handle_billing(state: SupportTicketState) -> SupportTicketState:
    """Billing support handler."""
    # Different tools: access billing system
    customer_id = extract_customer_id(state["message"])
    billing_info = billing_api.get(customer_id)
    
    response = client.messages.create(
        model="claude-sonnet-4-20250514",
        messages=[{
            "role": "user",
            "content": f"""Handle billing inquiry.

Issue: {state['message']}
Billing: {billing_info}

Explain charges clearly."""
        }]
    )
    
    state["resolution"] = response.content[0].text
    return state

def handle_sales(state: SupportTicketState) -> SupportTicketState:
    """Sales inquiry handler."""
    products = product_catalog.search(state["message"])
    
    response = client.messages.create(
        model="claude-sonnet-4-20250514",
        messages=[{
            "role": "user",
            "content": f"""Handle sales inquiry.

Question: {state['message']}
Products: {products}

Recommend solutions."""
        }]
    )
    
    state["resolution"] = response.content[0].text
    return state

def route_ticket(state: SupportTicketState) -> str:
    """Routing logic (deterministic)."""
    
    # High priority always escalates
    if state["priority"] == "high":
        return "escalate"
    
    # Route by category
    routing = {
        "technical": "technical",
        "billing": "billing",
        "sales": "sales"
    }
    
    return routing[state["category"]]

# Build workflow with conditional routing
workflow = StateGraph(SupportTicketState)

workflow.add_node("classify", classify_ticket)
workflow.add_node("technical", handle_technical)
workflow.add_node("billing", handle_billing)
workflow.add_node("sales", handle_sales)
workflow.add_node("escalate", escalate_to_human)

workflow.set_entry_point("classify")

# Conditional routing
workflow.add_conditional_edges(
    "classify",
    route_ticket,  # Function decides next step
    {
        "technical": "technical",
        "billing": "billing",
        "sales": "sales",
        "escalate": "escalate"
    }
)

# All paths end
for node in ["technical", "billing", "sales", "escalate"]:
    workflow.add_edge(node, END)

app = workflow.compile()
Key Insight: The route_ticket function is pure Python logic - 100% deterministic. No LLM makes routing decisions.

4. Loops and Iteration

When to use: Quality improvement through iteration, retry logic. Code Generation with Validation Loop:
class CodeGenState(TypedDict):
    """Code generation workflow state."""
    requirements: str
    code: str
    test_results: dict
    iteration: int
    max_iterations: int
    success: bool

def generate_code(state: CodeGenState) -> CodeGenState:
    """Generate or refine code."""
    
    if state["iteration"] == 0:
        # First attempt
        prompt = f"Write Python code for: {state['requirements']}"
    else:
        # Refinement
        prompt = f"""Previous code had issues: {state['test_results']}

Requirements: {state['requirements']}
Previous code: {state['code']}

Fix the issues."""
    
    response = client.messages.create(
        model="claude-sonnet-4-20250514",
        messages=[{"role": "user", "content": prompt}],
        temperature=0.2
    )
    
    # Extract code
    code = extract_code_block(response.content[0].text)
    
    state["code"] = code
    state["iteration"] += 1
    return state

def validate_code(state: CodeGenState) -> CodeGenState:
    """Run tests on generated code."""
    
    results = {"syntax_valid": False, "tests_pass": False, "errors": []}
    
    # Syntax check
    try:
        compile(state["code"], "<string>", "exec")
        results["syntax_valid"] = True
    except SyntaxError as e:
        results["errors"].append(f"Syntax: {e}")
        state["test_results"] = results
        return state
    
    # Functional tests
    try:
        exec(state["code"], {})
        results["tests_pass"] = True
    except Exception as e:
        results["errors"].append(f"Runtime: {e}")
    
    state["test_results"] = results
    state["success"] = results["syntax_valid"] and results["tests_pass"]
    
    return state

def should_retry(state: CodeGenState) -> str:
    """Decide: retry or finish (deterministic)."""
    
    if state["success"]:
        return "finish"
    
    if state["iteration"] >= state["max_iterations"]:
        return "finish"  # Give up
    
    return "retry"

# Build iterative workflow
workflow = StateGraph(CodeGenState)

workflow.add_node("generate", generate_code)
workflow.add_node("validate", validate_code)

workflow.set_entry_point("generate")
workflow.add_edge("generate", "validate")

# Loop back or finish
workflow.add_conditional_edges(
    "validate",
    should_retry,
    {
        "retry": "generate",  # Loop back
        "finish": END
    }
)

app = workflow.compile()

# Execute with iteration limit
result = app.invoke({
    "requirements": "Calculate fibonacci efficiently",
    "code": "",
    "test_results": {},
    "iteration": 0,
    "max_iterations": 3,  # Safety limit
    "success": False
})
Critical: Always set max_iterations to prevent infinite loops. In production, also add cost/time limits.

Combining Patterns

Real workflows combine multiple patterns: Example: Document Processing with Quality Control
workflow = StateGraph(DocState)

# Sequential: Parse → Extract → Summarize
workflow.add_node("parse", parse_document)
workflow.add_node("extract", extract_entities)
workflow.add_node("summarize", create_summary)

# Quality check with loop
workflow.add_node("quality_check", validate_quality)

def quality_router(state):
    """Deterministic routing."""
    if state["quality_score"] > 0.8:
        return "approve"
    elif state["iteration"] < 3:
        return "retry"
    else:
        return "human_review"

workflow.add_node("approve", approve_doc)
workflow.add_node("human_review", flag_for_human)

# Build graph
workflow.set_entry_point("parse")
workflow.add_edge("parse", "extract")
workflow.add_edge("extract", "summarize")
workflow.add_edge("summarize", "quality_check")

workflow.add_conditional_edges(
    "quality_check",
    quality_router,
    {
        "approve": "approve",
        "retry": "parse",  # Loop back
        "human_review": "human_review"
    }
)

workflow.add_edge("approve", END)
workflow.add_edge("human_review", END)
This combines:
  • Sequential: parse → extract → summarize
  • Conditional: approve vs retry vs escalate
  • Loop: retry up to 3 times

Production Deployment Considerations

1. Observability

Add logging at every node:
def extract_entities(state: DocumentState) -> DocumentState:
    """Step with logging."""
    
    start = time.time()
    
    try:
        # Execute step
        response = client.messages.create(...)
        state["entities"] = parse(response)
        
        # Log success
        logger.info("Entity extraction succeeded", extra={
            "duration_ms": (time.time() - start) * 1000,
            "entities_found": len(state["entities"]),
            "document_length": len(state["raw_text"])
        })
        
        return state
        
    except Exception as e:
        # Log failure
        logger.error("Entity extraction failed", extra={
            "duration_ms": (time.time() - start) * 1000,
            "error": str(e)
        })
        raise

2. Cost Tracking

Monitor token usage:
class CostTracker:
    """Track workflow costs."""
    
    def __init__(self):
        self.total_cost = 0
        self.step_costs = {}
    
    def track_step(self, step_name: str, tokens: int, model: str):
        """Track cost per step."""
        
        # Model pricing
        prices = {
            "claude-sonnet-4": 0.003 / 1000,  # per token
            "gpt-4o-mini": 0.00015 / 1000
        }
        
        cost = tokens * prices[model]
        
        self.total_cost += cost
        self.step_costs[step_name] = self.step_costs.get(step_name, 0) + cost
    
    def get_report(self) -> dict:
        """Cost breakdown."""
        return {
            "total_cost": self.total_cost,
            "by_step": self.step_costs,
            "most_expensive": max(self.step_costs, key=self.step_costs.get)
        }

# Usage
tracker = CostTracker()

def expensive_step(state):
    response = client.messages.create(...)
    
    tracker.track_step(
        "entity_extraction",
        response.usage.total_tokens,
        "claude-sonnet-4"
    )
    
    return state

3. Timeout Protection

Prevent runaway workflows:
import asyncio

async def execute_with_timeout(workflow, state, timeout_seconds=300):
    """Execute workflow with timeout."""
    
    try:
        result = await asyncio.wait_for(
            workflow.ainvoke(state),
            timeout=timeout_seconds
        )
        return result
        
    except asyncio.TimeoutError:
        logger.error(f"Workflow timeout after {timeout_seconds}s")
        return {
            "success": False,
            "error": "timeout",
            "message": "Workflow exceeded time limit"
        }

Check Your Understanding

  1. Design: You need to process 50 documents. Each requires: extract → validate → store. What pattern?
  2. Debugging: Your workflow sometimes hangs. What’s the most likely cause?